package com.gitee.jastee.kafka.transaction;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import com.gitee.jastee.kafka.config.Constant;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 * 幂等性生产者
 *
 *      它只能保证单分区上的幂等性，即一个幂等性 Producer 能够保证某个主题的一个 分区上不出现重复消息，它无法实现多个分区的幂等性
 *      它只能实现单会话上的幂等性，不能实现跨会话的幂等性。这里的会话，你可以理 解为 Producer 进程的一次运行。当你重启了 Producer 进程之后，这种幂等性保 证就丧失了
 * @author jast
 * @date 2020/4/21 15:38
 */
public class IdempotenceProducer {

    private Producer<String, String> producer ;
    public IdempotenceProducer() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Constant.KAFKA_BROKERS);
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 5);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1024 * 1024 * 1024);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        //设置Producer幂等性,其他不用变化
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);

        producer = new KafkaProducer(props);

    }
    public Producer<String,String> getProducer(){
        return producer;
    }
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        IdempotenceProducer idempotenceProducer = new IdempotenceProducer();
        Producer<String, String> producer = idempotenceProducer.getProducer();
        producer.send(new ProducerRecord("test","1234")).get();

    }

}
